package com.amon.amonplayer.rxjava;

import com.amon.amonplayer.rxjava.schedulder.Schedulers;

public class ObservableSchedulers<T> extends Observable<T> {
    Observable<T> source;
    Schedulers schedulers;

    public ObservableSchedulers(Observable<T> source, Schedulers schedulers) {
        this.source = source;
        this.schedulers = schedulers;
    }

    @Override
    protected void subscribeActual(Observer<T> observer) {
        schedulers.scheduleDirect(new SchedulerTask(observer));
    }

    private class SchedulerTask implements Runnable{
        private final Observer<T> observer;

        public SchedulerTask(Observer<T> observer) {
            this.observer = observer;
        }

        @Override
        public void run() {
            source.subscribe(observer);
        }
    }
}
